package com.changkang.webim.mqtt;





import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

import com.changkang.webim.common.QosType;
import com.jfinal.core.JFinal;
import com.jfinal.kit.LogKit;
import com.jfinal.kit.PropKit;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.IPlugin;

/**
 * 自定义mqtt插件
 * @author 常康
 *
 */
public class MqttPlugin implements  IPlugin{

	private  MqttClient client;//异步MQTT Client

	private MqttConnectOptions options;//MQTT Cleint连接配置

	private String brokerURL;//MQTT服务器连接地址
	
	private boolean manualAcks = false;//是否手动应答    //默认为false
	
	private String clientId="";//当前客户端ID

	private String userName;//连接MQTT额用户名

	private String password;//连接MQTT额密码

	private boolean automaticReconnection = false; //是否自动重连 默认诶false

	private boolean cleanSession = true;//是否保持session，true无法接受离线消息， false可以接受离线消息

	private int connectionTimeout = 30;//连接超时时间

	private int keepAliveInterval = 60;//心跳时间

	private String stroageDir;// 默认存储在内存中，如果设置了stroageDir，则可以将mqtt的消息序列化指定的目录之下
	
	private String willTopic;//将遗嘱消息发布到那个主题

	private String willPayload;//遗嘱消息内容
    
	private String defaultTopic;
	
	private int willQos = QosType.QOS_EXACTLY_ONCE.type();//遗嘱消息内容

	private boolean willretained = false;//是否保留这个遗嘱消息
	
	public MqttPlugin() {
		this.brokerURL=PropKit.get("mqtt.brokerURL");
		this.userName=PropKit.get("mqtt.userName");
		this.password=PropKit.get("mqtt.password");
		this.automaticReconnection=PropKit.getBoolean("mqtt.automaticReconnection");
		this.cleanSession=PropKit.getBoolean("mqtt.cleanSession");
		this.keepAliveInterval=PropKit.getInt("mqtt.keepAliveInterval");
		this.stroageDir=PropKit.get("mqtt.stroageDir");
		this.willTopic=PropKit.get("mqtt.willTopic");
		this.willPayload=PropKit.get("mqtt.willPayload");
		this.defaultTopic=PropKit.get("mqtt.serverTopic");	
		this.connectionTimeout=PropKit.getInt("mqtt.connectionTimeout");	
	}

	@Override
	public boolean start() {
		
		//获取实例化mqtt client
				try {
					if(StrKit.isBlank(this.stroageDir)){
						this.client = new MqttClient("tcp://"+this.brokerURL+":1883", this.clientId);
					}else{
						this.client = new MqttClient("tcp://"+this.brokerURL+":1883", this.clientId, new MqttDefaultFilePersistence(this.stroageDir));
					}
				} catch (MqttException e) {
					LogKit.error("MQTT Client Plugin 初始化Client失败",e);
					return false;
				}
				
				this.client.setManualAcks(this.manualAcks);
				this.client.setCallback(new MqttMessageCallback());
				this.options = new MqttConnectOptions();
				this.options.setAutomaticReconnect(this.automaticReconnection);
				this.options.setCleanSession(this.cleanSession);
				this.options.setConnectionTimeout(this.connectionTimeout);
				this.options.setKeepAliveInterval(this.keepAliveInterval);	
			    this.options.setConnectionTimeout(this.connectionTimeout);
			    
				//设置用户名密码
				if(StrKit.notBlank(this.userName) && StrKit.notBlank(this.password)){
					this.options.setUserName(this.userName);
					this.options.setPassword(this.password.toCharArray());
				}
				
				//设置遗嘱消息
				if(StrKit.notBlank(this.willTopic) && StrKit.notBlank(this.willPayload)){
					this.options.setWill(this.willTopic, this.willPayload.getBytes(), this.willQos, this.willretained);
				}
				
				try {
					this.client.connect(this.options);
				} catch (MqttSecurityException e) {
					LogKit.error("MQTT Clinet连接MQTT Broker验证失败,请检查连接配置信息",e);
					return false;
				} catch (MqttException e) {
					LogKit.error("MQTT Clinet连接MQTT Broker失败",e);
					return false;
				}	
				
				//订阅默认主题
				try {
					this.client.subscribe(this.defaultTopic, QosType.QOS_EXACTLY_ONCE.type());
					//将服务器主题放入到application中方便页面获取
					JFinal.me().getServletContext().setAttribute("serverTopic", this.defaultTopic);
					JFinal.me().getServletContext().setAttribute("serverIp", this.brokerURL);
				} catch (MqttException e1) {
					LogKit.error("MQTT Client 订阅默认topic失败",e1);
					return false;
				}
				
				MqttKit.init(this.client);
				
		LogKit.info("MQTT Client 连接成功");
		return true;
	}

	@Override
	public boolean stop() {
		
		if(this.client != null && this.client.isConnected()){
			try {
				this.client.disconnect();
			} catch (MqttException e) {
				LogKit.error("释放MQTT Client连接失败",e);
				return false;
			}
		}
		JFinal.me().getServletContext().removeAttribute("serverTopic");
		LogKit.info("MQTT Client 释放连接成功");
		return true;
	}

}
